package org.ghost.springboot2.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.ghost.springboot2.demo.common.util.ChineseNameUtil;
import org.ghost.springboot2.demo.common.util.JacksonUtil;
import org.ghost.springboot2.demo.config.RocketMqConfig;
import org.ghost.springboot2.demo.dto.OrderMessageDTO;
import org.ghost.springboot2.demo.dto.RspDTO;
import org.ghost.springboot2.demo.service.IDefaultProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * @author Administrator
 */
@Slf4j
@RequestMapping("/defaultProducer")
@RestController
public class DefaultProducerController extends BaseController {

    @Autowired
    private IDefaultProducerService defaultProducerService;

    @RequestMapping(value = "syncSend", method = RequestMethod.POST)
    public RspDTO syncSend(@RequestParam(required = false) String msg) {
        try {
            SendResult sendResult = this.defaultProducerService.syncSend(RocketMqConfig.TOPIC_ORDER, "", msg);
            return this.render(sendResult);
        } catch (RemotingException | MQClientException | MQBrokerException | InterruptedException e) {
            log.error("*****DefaultProducerController.send出错:" + e.getMessage(), e);
            return this.render(e.hashCode() + "", e.getMessage());
        }
    }

    @RequestMapping(value = "asyncSend", method = RequestMethod.POST)
    public RspDTO asyncSend(@RequestParam(required = false) String msg) {
        try {
            this.defaultProducerService.asyncSend(RocketMqConfig.TOPIC_ORDER, "", msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("*****DefaultProducerController.asyncSend,msg={},结果={}", msg, sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    log.error("*****DefaultProducerController.asyncSend出现异常:" + e.getMessage() + ",msg=" + msg, e);
                }
            });
            return this.render();
        } catch (RemotingException | MQClientException | InterruptedException e) {
            log.error("*****DefaultProducerController.send出错:" + e.getMessage(), e);
            return this.render(e.hashCode() + "", e.getMessage());
        }
    }

    @RequestMapping(value = "syncSendOrder", method = RequestMethod.POST)
    public RspDTO syncSendOrder(@RequestParam(required = false) @Max(1000) @Min(1) @NotNull Integer num) {
        long currentTime = System.currentTimeMillis();
        List<OrderMessageDTO> orderMessageDTOList = IntStream.rangeClosed(1, num)
                .mapToObj(it -> {
                    OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
                    orderMessageDTO.setOrderId(currentTime * 1000 + it);
                    orderMessageDTO.setBuyer(ChineseNameUtil.getChineseName());
                    orderMessageDTO.setNum(RandomUtils.nextInt(1, 100));
                    return orderMessageDTO;
                }).collect(Collectors.toList());


        //保证同一个订单的几个步骤进入同一个队列，严格先后顺序
        List<String> tagList = Arrays.asList("下单", "付款", "发货", "收货", "评价");
        tagList.stream().filter(StringUtils::isNotBlank)
                .forEach(tag -> {
                    orderMessageDTOList.parallelStream()
                            .filter(Objects::nonNull)
                            .forEach(it -> {
                                try {
                                    String msg = JacksonUtil.useDefaultMapper().toJson(it);
                                    SendResult sendResult = this.defaultProducerService.syncSend(RocketMqConfig.TOPIC_ORDER, tag, msg, new MessageQueueSelector() {
                                        @Override
                                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                            Long orderId = (Long) arg;
                                            int index = (int) (orderId % mqs.size());
                                            return mqs.get(index);
                                        }
                                    }, it.getOrderId());
                                    log.info("*****DefaultProducerController.syncSendOrder发送msg={},tasg={},结果={}", it, tag, sendResult);
                                } catch (Exception e) {
                                    log.error("*****DefaultProducerController.syncSendOrder发送异常msg={},tags={},error={}", it, tag, e);
                                }
                            });
                });

        return this.render();
    }
}
